Adding new logic for extended sessions to enable the DTS use-case#1333
Adding new logic for extended sessions to enable the DTS use-case#1333
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an explicit “end session” hook to the extended sessions (sticky sessions) flow so providers (e.g., DTS) can release a session/work item when the extended session finishes.
Changes:
- Introduces
IOrchestrationSession.EndSessionAsync()as a new session-termination callback. - Updates orchestration and entity dispatchers to call
EndSessionAsync()when an extended session ends (after releasing the in-proc concurrency lock). - Adds a no-op
EndSessionAsync()implementation for the Azure StorageOrchestrationSession.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/DurableTask.Core/TaskOrchestrationDispatcher.cs | Calls EndSessionAsync() when releasing an extended session; adjusts rewind handling logic. |
| src/DurableTask.Core/TaskEntityDispatcher.cs | Calls EndSessionAsync() when releasing an extended session in the entity dispatcher. |
| src/DurableTask.Core/IOrchestrationSession.cs | Adds the new EndSessionAsync() method to the public session interface. |
| src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs | Implements EndSessionAsync() as a no-op for the Azure Storage provider. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -553,10 +554,6 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work | |||
| orchestratorMessages.AddRange(subOrchestrationRewindMessages); | |||
| workItem.OrchestrationRuntimeState = newRuntimeState; | |||
| runtimeState = newRuntimeState; | |||
There was a problem hiding this comment.
In the RewindOrchestration decision path, the dispatcher no longer marks the work item as rewinding/interrupted. This can cause OnProcessWorkItemAsync to return false, so the extended session loop continues fetching/processing while the orchestration history/runtime state was manually rewritten by ProcessRewindOrchestrationDecision. Previously the code intentionally ended the extended session in this case to avoid persisting inconsistent cached session state; please restore an equivalent interrupt/end-session signal here (e.g., set isRewinding/isInterrupted) so the session terminates after processing a rewind decision.
| runtimeState = newRuntimeState; | |
| runtimeState = newRuntimeState; | |
| // Mark this session as rewinding/interrupted so the extended session loop terminates | |
| isRewinding = true; | |
| isInterrupted = true; |
| /// <summary> | ||
| /// Ends the session. | ||
| /// </summary> | ||
| /// <returns>A task that completes when the session has been ended.</returns> | ||
| Task EndSessionAsync(); |
There was a problem hiding this comment.
IOrchestrationSession is a public interface in a netstandard2.0 library; adding EndSessionAsync() is a breaking change for any external providers that implement this interface (they will fail to compile until they add the new method). If backward compatibility is required, consider introducing a new optional interface (e.g., IOrchestrationSessionWithEnd/IOrchestrationSession2) and feature-detect it in the dispatchers, or provide an alternative non-interface-based extension point.
| @@ -30,5 +31,11 @@ public interface IOrchestrationSession | |||
| /// and the dispatcher will shut down the session. | |||
| /// </remarks> | |||
| Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); | |||
There was a problem hiding this comment.
With #nullable enable enabled in this file, FetchNewOrchestrationMessagesAsync is documented to allow returning null to end the session, but the signature returns a non-nullable Task<IList<TaskMessage>>. Consider updating the return type annotation to Task<IList<TaskMessage>?> (nullable reference annotation only) so implementers using nullable analysis don’t get misleading contracts/warnings and the signature matches the documented behavior.
| Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); | |
| Task<IList<TaskMessage>?> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); |
| "OnProcessWorkItemSession-Release", | ||
| $"Releasing extended session after {processCount} batch(es)."); | ||
| this.concurrentSessionLock.Release(); | ||
| await workItem.Session.EndSessionAsync(); | ||
| } |
There was a problem hiding this comment.
The dispatchers now rely on IOrchestrationSession.EndSessionAsync() being invoked when an extended session ends (after releasing the concurrentSessionLock). Please add/adjust automated coverage that asserts EndSessionAsync() is called in the extended-session flow (and not skipped on important exit paths like completion/interrupt), since this call is necessary for the DTS scenario and regressions here are easy to miss in existing integration tests.
This PR introduces new logic in the extended sessions flow that is necessary for the DTS use-case. In particular, it introduces a new
EndSessionAsynccall which will allow DTS to release the work item once the extended session ends.